#include "config.h"

#include <sys/types.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <sys/mman.h>
#include <dirent.h>
#include <math.h>

#define DBG_SUBSYS S_LIBTASK

#include "atomic.h"
#include "adt.h"
#include "dbg.h"
#include "conf.h"
#include "utils.h"
#include "system.h"
#include "lich_md.h"
#include "nodectl.h"
#include "balance2.h"
#include "recovery.h"
#include "sysy_lib.h"
#include "fileinfo.h"
#include "configure.h"
#include "rate_probe.h"
#include "token_bucket.h"
#include "disk.h"
#include "../../storage/controller/stor_ctl.h"
#include "../../storage/controller/volume_ctl.h"
#include "timepoint.h"

/**
 * @file Data Balancer
 *
 * @todo 异常处理：删除卷
 * @todo 异常处理：删除pool
 * @todo 处理平衡过程中有vctl加入的情况
 * @todo 带宽控制
 * @todo support fault set
 * @todo 输出: speed，进度
 * @todo 控制队列长度
 * @todo 并发
 * @todo 配置管理
 * @todo 策略
 *
 * @todo 相关事件：恢复过程
 * @todo 日志
 * @todo tool
 *
 * @config /opt/fusionstack/data/balance/{interval,threshold,thread,step,fill_rate}
 *
 * @config /dev/shm/lich4/nodectl/balance/runlevel
 *
 * @out /dev/shm/lich4/nodectl/balance/<pool>/info
 *
 */

#define DRY_RUN 0

#define RUNLEVEL_PATH "balance/runlevel"

#define ACTOR_MAX 10

typedef enum {
        __RL_OFF__   = 0,
        __RL_START__ = 1,
        __RL_STOP__  = 2,
} runlevel_t;

typedef struct {
        nid_t nid;
        uint32_t type;
        uint32_t status;

        uint64_t used;
        uint64_t total;
        double percent;

        // raw info
        nodedfree_t dfree;

        char node[HOST_NAME_MAX];
        char rack[HOST_NAME_MAX];
} b_node_info_t;

typedef struct __node_list_t {
        int node_count;
        b_node_info_t nodes[LICH_NODE_MAX];
        b_node_info_t *nid_map[LICH_NODE_MAX];

        void *_pool;

        int (*get)(struct __node_list_t *nlist);
        void (*dump)(struct __node_list_t *nlist);
        void (*reset)(struct __node_list_t *nlist);
} b_node_list_t;

typedef struct {
        int node_count;
        b_node_info_t *nodes[LICH_REPLICA_MAX * 2];
        int bitmap[LICH_REPLICA_MAX * 2];
} nodeset_t;

typedef enum {
        __FAULT_SET_RACK__,
        __FAULT_SET_NODE__,
} b_faultset_level_t;

typedef enum {
        __S_VOL_INIT = 0,
        __S_VOL_DELETED,
        __S_VOL_DONE,
        __S_VOL_REMCHG,
} b_vol_status_t;

typedef struct {
        struct list_head hook;

        chkid_t chkid;
        uint64_t size;
        int repnum;
        // int chknum;
        int localize;

        b_vol_status_t status;

        generator_t gen;

        void *_pool;
} b_vol_info_t;

typedef struct {
        struct list_head hook;

        chkid_t parent;
        chkinfo_t *chkinfo;
        char _chkinfo[CHKINFO_MAX];
        nid_t newrep[LICH_REPLICA_MAX];
} b_job_t;

typedef enum {
        __TH_STOPPED__,
        __TH_RUNNING__,
        __TH_STOP__,
} b_thread_status_t;

/**
 * worker thread
 */
typedef struct __actor_t {
        sy_rwlock_t lock;
        count_list_t queue;      ///< list of b_job_t

        int _inited;
        int _idx;
        int _status;
        int _stop_if_enoent;
        void *_pool;

        int (*init)(struct __actor_t *actor);
        int (*push)(struct __actor_t *actor, b_job_t *job);
        int (*pop)(struct __actor_t *actor, b_job_t **job);
} actor_t;

typedef enum {
        __S_STOPPED__ = 0,
        __S_SCAN__,
        __S_RUNNING__,
        __S_WAITING__,
        __S_WAITING_RECOVERY__,
        __S_STOP__,
        __S_FAILED__,
        __S_DONE__,
        __S_UNKNOWN__,
} b_status_t;

enum {
        __T_COUNT_CHECK__   = 0,
        __T_COUNT_PUSH__    = 1,
        __T_COUNT_SUCCESS__ = 2,
        __T_COUNT_FAIL__    = 3,
};

typedef struct __pool_t {
        struct list_head hook;

        char name[MAX_NAME_LEN];

        count_list_t volume_list;   ///< list of b_vol_info_t
        int volume_count;

        b_node_list_t node_list;

        double average;   ///< pool‘s average

        actor_t actors[ACTOR_MAX];
        int actor_count;
        uint64_t hash;

        b_status_t status;

        // stat
        uint64_t check;
        uint64_t total;
        uint64_t success;
        uint64_t fail;

        uint64_t speed;

        b_faultset_level_t fs_level;

        timerange1_t trange;

        token_bucket_t bucket;

        sem_t sem;
        sy_rwlock_t count_lock;
        sy_rwlock_t dump_lock;

        int (*destroy)(struct __pool_t *pool);

        int (*start_main_thread)(struct __pool_t *pool);

        int (*start_actors)(struct __pool_t *pool);
        int (*stop_actors)(struct __pool_t *pool, int force);
        int (*is_actors_stopped)(struct __pool_t *pool);

        int (*scan)(struct __pool_t *pool);

        int (*is_saturated)(struct __pool_t *pool);
        int (*step)(struct __pool_t *pool, int step);
        int (*push)(struct __pool_t *pool, b_job_t *job);

        int (*set_status)(struct __pool_t *pool, b_status_t status);

        int (*inc_count)(struct __pool_t *pool, int t, int n);
        // int (*inc_success)(struct __pool_t *pool, int n);
        int (*done)(struct __pool_t *pool);

        int (*wakeup)(struct __pool_t *pool);
} b_pool_t;

typedef struct {
        int interval;
        int threshold;
        int actor_num;
        int step;

        nodectl_option_t opt_fill_rate;
} b_config_t;

typedef struct __module_t {
        sem_t sem;

        hashtable_t pools;

        b_status_t status;

        b_config_t config;

        nodectl_option_t opt_runlevel;

        // opts
        int (*stop)();

        int (*load_config)(struct __module_t *module, int log);
} b_module_t;


static b_module_t *__module__ = NULL;

static const char *__status2str__(b_status_t status);
static int __pool_check_deleted(b_pool_t *pool, count_list_t *volist);
static inline int __pool_volume_size(b_pool_t *pool);
static void __vol_set_status(b_pool_t *pool, b_vol_info_t *vol, b_vol_status_t status);

// @class node

/*
 * 更新节点列表并按照节点利用率排序
 * 后面会引用节点列表， 并且引用的过
 * 程严格依赖排序的结果
 */
static int node_cmp(const void * a, const void *b)
{
        return ( (((b_node_info_t *)a)->percent - ((b_node_info_t *)b)->percent) > 0 ? 1 : -1 );
}

static int percent_cmp(const void * a, const void *b)
{
        b_node_info_t *_a = *(b_node_info_t **)a;
        b_node_info_t *_b = *(b_node_info_t **)b;

        return _a->percent - _b->percent < 0 ? 1 : -1;
}

static int __node_list_get(b_node_list_t *nlist)
{
        int i, ret;
        vec_node_t v;
        cluster_node_t *val;
        nodeinfo_t *info;
        b_node_info_t *node;
        b_pool_t *pool = nlist->_pool;

        nlist->reset(nlist);

        vec_init(&v);
        ret = cluster_get_nodes(&v, FALSE);
        if (unlikely(ret)) {
                vec_destroy(&v);
                GOTO(err_ret, ret);
        }

        vec_foreach(&v, val, i) {
                info = &val->nodeinfo;
                node = &nlist->nodes[nlist->node_count++];

                node->nid.id = info->stat->nid.id;
                node->status = info->stat->status;
                node->type = info->stat->type;

                // TODO group by pool
                dfinfo_decode(info->dfinfo, strlen(info->dfinfo), &node->dfree);
                dfree_count2(&node->dfree, pool->name, &node->used, &node->total);
                node->percent = (node->total == 0 ? 0.0 : 1.0 * node->used/node->total);

                strcpy(node->node, info->nodename);
                disk2rack(info->nodename, node->rack);
        }

        vec_destroy(&v);

        qsort(nlist->nodes, nlist->node_count, sizeof(b_node_info_t), node_cmp);

        pool->average = 0;
        for (i=0; i < nlist->node_count; i++) {
                node = &nlist->nodes[i];
                nlist->nid_map[node->nid.id] = node;
                pool->average += 1.0 * node->used / node->total;
        }
        pool->average /= nlist->node_count;

        nlist->dump(nlist);

#if !DRY_RUN
        if (nlist->nodes[nlist->node_count -1].percent - nlist->nodes[0].percent <= __module__->config.threshold/100.0) {
                DINFO("pool %s already balance\n", pool->name);
                ret = EPERM;
                GOTO(err_ret, ret);
        }
#endif

        return 0;
err_ret:
        return ret;
}

static void __node_list_reset(b_node_list_t *nlist)
{
        nlist->node_count = 0;
        memset(nlist->nodes, 0x0, sizeof(b_node_info_t) * LICH_NODE_MAX);
        memset(nlist->nid_map, 0x0, sizeof(b_node_info_t*) * LICH_NODE_MAX);
}

static void __node_list_dump(b_node_list_t *nlist)
{
        int i, count;
        b_node_info_t *node;
        b_pool_t *pool = nlist->_pool;

        count = nlist->node_count;

        for(i=0; i<count; i++) {
                node = &nlist->nodes[i];
                DBUG("pool %s node %s/%d rack %s used %ju total %ju percent %0.6f\n",
                      pool->name, node->node, node->nid.id, node->rack, node->used, node->total, node->percent);
        }

        DBUG("pool %s node %d average %0.6f\n", pool->name, nlist->node_count, pool->average);
}

static int __rack_diff(b_node_info_t *node_list[], int count)
{
        int i, num;
        b_node_info_t *diff_node[LICH_NODE_MAX];

        if (count <= 0) {
                return 0;
        } else if (count == 1)
                return 1;

        num = 0;
        for (i = 1; i < count; i++) {
                if (0 != strcmp(node_list[0]->rack, node_list[i]->rack)) {
                        diff_node[num++] = node_list[i];
                }
        }

        return __rack_diff(diff_node, num) + 1;
}

/**
 * 故障域规则:
 * 1. 当该存储池下的故障域只有一个时 故障域是降级的
 * 多副本可以同时存在于一个故障域
 *
 * 2. 当该存储池下的故障域大于一个时 故障域是正常的
 * 多副本严格遵循故障域规则， 如果故障域数目不满足
 * 则只分配相应故障域数目的副本， 其他副本在条件允许后由recovery补齐
 */
static b_faultset_level_t __get_fs_level(b_node_info_t *nlist[], int count)
{
        int rack_num = 0;
        b_faultset_level_t level;

        rack_num = __rack_diff(nlist, count);
        if (rack_num <= 1) {
                level = __FAULT_SET_NODE__;
        } else {
                level = __FAULT_SET_RACK__;
        }

        return level;
}

// @class volume

static int __volume_create(b_vol_info_t **vol, b_pool_t *pool, const chkid_t *chkid,
                           fileinfo_t *fileinfo, filestat_t *filestat)
{
        int ret;
        b_vol_info_t *_vol;

        (void)filestat;

        *vol = NULL;

        ret = ymalloc((void **)&_vol, sizeof(b_vol_info_t));
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        _vol->status = __S_VOL_INIT;
        _vol->chkid = *chkid;
        //target repnum
        _vol->repnum = fileinfo->repnum_usr;
        _vol->size = size2chknum(fileinfo->size, &fileinfo->ec);
        // _vol->chknum = filestat->sparse;
        //default rack grade but update by after
        _vol->localize = is_volume_localize(fileinfo);

        //_vol->cursor = 0;
        chkid_generator_init(&_vol->gen, _vol->size, FILE_PROTO_EXTERN_ITEM_COUNT);

        _vol->_pool = pool;

        *vol = _vol;
        return 0;
err_ret:
        return ret;
}

static int __vol_init(b_vol_info_t **vol, b_vol_info_t *old_vol, const chkid_t *chkid, b_pool_t *pool)
{
        int ret, deleting, retry;
        chkinfo_t *chkinfo;
        char _chkinfo[CHKINFO_MAX];
        fileinfo_t fileinfo;
        filestat_t filestat;

        DBUG("pool %s vol "CHKID_FORMAT" start\n", pool->name, CHKID_ARG(chkid));

        *vol = NULL;

        retry = 0;
retry:
        ret = md_getattr(chkid, &fileinfo);
        if (unlikely(ret)) {
                if (ret == EAGAIN) {
                        USLEEP_RETRY(err_ret, ret, retry, retry, 50, (100 * 1000));
                } else
                        GOTO(err_ret, ret);
        }

        deleting = fileinfo.attr & __FILE_ATTR_DELETE__;
        if (unlikely(deleting)) {
                ret = ECANCELED;
                GOTO(err_ret, ret);
        }

        chkinfo = (void *)_chkinfo;
        ret = md_chunk_getinfo(pool->name, NULL, chkid, chkinfo, NULL);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (!net_islocal(CHKINFO_FIRSTNID(chkinfo))) {
                ret = EPERM;
                // DWARN(""CHKID_FORMAT" not local volume\n", CHKID_ARG(chkid));
                goto err_ret;
        }

#if 0
        // @note use __table1_stat, filestat.sparse is allocated chknum
        ret = stor_ctl_stat(chkid, &filestat, 0 , 0);
        if (unlikely(ret)) {
                // ESTALE
                // DWARN("ret %d\n", ret);
                GOTO(err_ret, ret);
        }

        if (filestat.sparse <= 0) {
                ret = EPERM;
                DWARN(""CHKID_FORMAT" volume is empty!\n", CHKID_ARG(chkid));
                GOTO(err_ret, ret);
        }
#endif

        if (old_vol) {
                YASSERT(old_vol->status == __S_VOL_REMCHG);

                DINFO("pool %s vol "CHKID_FORMAT" from REMCHG to INIT\n", pool->name, CHKID_ARG(&chkinfo->id));
                __vol_set_status(pool, old_vol, __S_VOL_INIT);
        } else {
                ret = __volume_create(vol, pool, chkid, &fileinfo, &filestat);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                YASSERT(*vol != NULL);
                count_list_add_tail(&(*vol)->hook, &pool->volume_list);

                __vol_set_status(pool, *vol, __S_VOL_INIT);

                DINFO("pool %s vol "CHKID_FORMAT"\n", pool->name, CHKID_ARG(&chkinfo->id));
        }

        return 0;
err_ret:
        return ret;
}

// @class job

static int __job_create(b_job_t **job, chkid_t *parent, chkinfo_t *chkinfo, nodeset_t *nodeset)
{
        int ret, rep;
        b_job_t *_job;

        *job = NULL;

        ret = ymalloc((void **)&_job, sizeof(*_job));
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        _job->parent = *parent;
        _job->chkinfo = (chkinfo_t *)_job->_chkinfo;
        memcpy(_job->chkinfo, chkinfo, CHKINFO_SIZE(chkinfo->repnum));

        rep = 0;
        for (int i = 0; i < nodeset->node_count; i++) {
                if (1 == nodeset->bitmap[i]) {
                        _job->newrep[rep++].id = nodeset->nodes[i]->nid.id;
                }
        }

        YASSERT(rep == chkinfo->repnum);

        *job = _job;
        return 0;
err_ret:
        return ret;
}

static int __job_do(b_job_t *job)
{
        int ret;

        chkinfo_t *chkinfo = job->chkinfo;
        chkid_t *chkid = &chkinfo->id;

#if 0
        ret = md_parent_get(chkid, parent);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }
#endif

        ANALYSIS_BEGIN(0);

        //ret = stor_ctl_chunk_move(job->parent, chkid, job->newrep, chkinfo->repnum);
        ret = md_chunk_move(net_getnid(), &job->parent, chkid, job->newrep, chkinfo->repnum);
        if (unlikely(ret)) {
                if (ret == ECANCELED || ret == ESTALE || EREMCHG) {
                        // TODO if volume is deleted, ret == ECANCELD | ESTALE | EREMCHG
                        GOTO(out, ret);
                } else {
                        DWARN("chk:"CHKID_FORMAT" move fail, ret %d\n", CHKID_ARG(chkid), ret)
                        GOTO(err_ret, ret);
                }
        }

out:
        ANALYSIS_END(0, 1000 * 1000, "md_chunk_move");
        return 0;
err_ret:
        return ret;
}

// @class actor

static int __actor_init(actor_t *actor)
{
        int ret;

        if (!actor->_inited) {
                ret = sy_rwlock_init(&actor->lock, "actor");
                if (unlikely(ret))
                        UNIMPLEMENTED(__DUMP__);

                count_list_init(&actor->queue);

                actor->_status = __TH_STOPPED__;
                actor->_inited = 1;
        }

        return 0;
}

static int __actor_push(actor_t *actor, b_job_t *job)
{
        int ret;

        ret = sy_rwlock_wrlock(&actor->lock);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        count_list_add_tail(&job->hook, &actor->queue);

        sy_rwlock_unlock(&actor->lock);
        return 0;
}

static int __actor_pop(actor_t *actor, b_job_t **job)
{
        int ret;
        struct list_head *pos, *n;
        b_job_t *_job;

        *job = NULL;

        ret = sy_rwlock_wrlock(&actor->lock);
        if (unlikely(ret))
                GOTO(out1, ret);

        list_for_each_safe(pos, n, &actor->queue.list) {
                _job = list_entry(pos, b_job_t, hook);
                count_list_del_init(pos, &actor->queue);

                *job = _job;
                break;
        }

        sy_rwlock_unlock(&actor->lock);

        if (*job == NULL) {
                ret = ENOENT;
                goto out1;
                // GOTO(out1, ret);
        }

        return 0;
out1:
        return ret;
}

// @class pool

static int __pool_destroy(b_pool_t *pool)
{
        int ret;

        YASSERT(pool->is_actors_stopped(pool));
        YASSERT(pool->status == __S_DONE__);

        count_list_free(&pool->volume_list, yfree);

        ret = sem_destroy(&pool->sem);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        ret = sy_rwlock_destroy(&pool->count_lock);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        ret = sy_rwlock_destroy(&pool->dump_lock);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        token_bucket_destroy(&pool->bucket);

        return 0;
}

static void *__pool_main_thread_proc(void *arg)
{
        int ret, force = 0;
        uint64_t count = 0;
        b_pool_t *pool = arg;
        b_node_list_t *nlist = &pool->node_list;
        b_node_info_t *nlist2[LICH_NODE_MAX];

        sem_post(&pool->sem);

        struct timeval t1, t2;
        int64_t used;
        _gettimeofday(&t1, NULL);

        pool->set_status(pool, __S_SCAN__);

        ret = pool->scan(pool);
        if (unlikely(ret))
                GOTO(out1, ret);

        YASSERT(pool->volume_list.count > 0);
        YASSERT(pool->actor_count > 0);

        pool->start_actors(pool);

        pool->set_status(pool, __S_RUNNING__);

        while (1) {
                DBUG("pool %s status %s\n", pool->name, __status2str__(pool->status));

                if (pool->status == __S_STOP__) {
                        DINFO("pool %s stop\n", pool->name);
                        force = 1;
                        break;
                }

                if (timerange1_update(&pool->trange, pool->success)) {
                        pool->speed = pool->trange.speed;
                        pool->set_status(pool, __S_UNKNOWN__);

                        // refresh fill rate
                        int fill_rate = __module__->config.opt_fill_rate.value;
                        if (fabs(pool->bucket.rate - fill_rate) >= 1) {
                                DINFO("fill rate %0.2f => %d\n", pool->bucket.rate, fill_rate);
                                token_bucket_set(&pool->bucket, "balance.pool", fill_rate, fill_rate, fill_rate, 0, 0);
                        }
                }

                ret = pool->scan(pool);
                if (unlikely(ret)) {
                        YASSERT(ret == EPERM);
                        if (pool->done(pool)) {
                                pool->set_status(pool, __S_UNKNOWN__);
                                break;
                        } else {
                                usleep(1000 * 1000);
                                continue;
                        }
                }

                if (recovery_is_running()) {
                        DBUG("recovery is running\n");
                        usleep(1000 * 1000);
                        if (pool->status != __S_STOP__) {
                                pool->status = __S_WAITING_RECOVERY__;
                        }
                        continue;
                } else {
                        if (pool->status != __S_STOP__) {
                                pool->status = __S_RUNNING__;
                        }
                }

                // TODO wait
                if (pool->is_saturated(pool)) {
                        usleep(10 * 1000);
                        continue;
                }

                if (count % 10 == 0) {
                        // TODO 处理vctl加入的情况
                        ret = __pool_check_deleted(pool, &pool->volume_list);
                        if (unlikely(ret)) {
                                continue;
                        }
                }

                count += 1;

                ret = nlist->get(nlist);
                if (unlikely(ret)) {
                        if (ret == EPERM) {
                                nlist->dump(nlist);
                                break;
                        } else {
                                DWARN("ret %d\n", ret);
                                usleep(10 * 1000);
                                continue;
                        }
                }

                for (int i=0; i < nlist->node_count; i++) {
                        nlist2[i] = &nlist->nodes[i];
                }

                pool->fs_level = __get_fs_level(nlist2, nlist->node_count);

                DBUG("pool %s status %d level %s average %0.6f count %ju\n",
                     pool->name,
                     pool->status,
                     (pool->fs_level == __FAULT_SET_NODE__ ? "node" : "rack"),
                     pool->average,
                     count);

                ret = pool->step(pool, __module__->config.step);
                if (unlikely(ret)) {
                        if (ret == ENOENT) {
                                // TODO 如果直接退出，将不能更新speed等信息
                                // scan done ONLY
                                DBUG("pool %s scan done\n", pool->name);
                                usleep(10 * 1000);
                                continue;
                        } else if (ret == ESTALE){
                                // TODO
                                DWARN("ret %d\n", ret);
                                usleep(10 * 1000);
                                continue;
                        } else {
                                DWARN("ret %d\n", ret);
                                usleep(10 * 1000);
                                continue;
                        }
                }
        }

        pool->stop_actors(pool, force);
out1:
        pool->set_status(pool, __S_DONE__);

        _gettimeofday(&t2, NULL);
        used = _time_used(&t1, &t2);
        DINFO("pool %s used %0.3f ret %d\n", pool->name, 1.0 * used/USECONDS_PER_SEC, ret);

        return NULL;
}

static void *__actor_proc(void *arg)
{
        int ret;
        actor_t *actor = arg;
        b_pool_t *pool = actor->_pool;
        b_job_t *job = NULL;
        struct timeval t1, t2;
        int64_t used1;

        actor->_status = __TH_RUNNING__;

        while (1) {
                if (actor->_status == __TH_STOP__)
                        break;

                // TODO 空转的actor不能消耗token
#if 0
                if (actor->queue.count <= 0) {
                        usleep(10 * 1000);
                        continue;
                }
#endif

                ret = actor->pop(actor, &job);
                if (unlikely(ret)) {
                        if (ret == ENOENT) {
                                if (actor->_stop_if_enoent)
                                        break;
                        } else {
                                DWARN("pool %s actor %d ret %d\n", pool->name, actor->_idx, ret);
                        }

                        usleep(10 * 1000);
                        continue;
                }

                YASSERT(job != NULL);

                ret = token_bucket_consume_loop(&pool->bucket, 1, 100, 10000);
                if (unlikely(ret)) {
                        ret = actor->push(actor, job);
                        if (unlikely(ret)) {
                                DWARN("ret %d\n", ret);
                                yfree((void **)&job);
                        }
                        continue;
                }

                _gettimeofday(&t1, NULL);

                ret = __job_do(job);
                if (unlikely(ret)) {
                        DWARN("ret %d\n", ret);
                }

                if (ret == 0) {
                        pool->inc_count(pool, __T_COUNT_SUCCESS__, 1);
                } else {
                        pool->inc_count(pool, __T_COUNT_FAIL__, 1);
                }

                _gettimeofday(&t2, NULL);
                used1 = _time_used(&t1, &t2);

                DBUG("pool %s total %ju success %ju fail %ju mq %d hash %d vol "CHKID_FORMAT" chunk "CHKID_FORMAT" used %0.3f\n",
                     pool->name,
                     pool->total,
                     pool->success,
                     pool->fail,
                     actor->queue.count,
                     actor->_idx,
                     CHKID_ARG(&job->parent),
                     CHKID_ARG(&job->chkinfo->id),
                     1.0 * used1 / USECONDS_PER_SEC);

                yfree((void **)&job);
        }

        DBUG("pool %p %s actor %d/%d stopped\n", pool, pool->name, pool->actor_count, actor->_idx);

        actor->_status = __TH_STOPPED__;

        YASSERT(pool->status != __S_DONE__);
        YASSERT(pool->actor_count == __module__->config.actor_num);
        YASSERT(actor->_status == __TH_STOPPED__);

        return NULL;
}

static int __pool_start_main_pool(b_pool_t *pool)
{
        int ret;

        ret = sy_thread_create2(__pool_main_thread_proc, (void *)pool, "pool_main_thread");
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

static int __pool_start_actors(b_pool_t *pool)
{
        int ret;
        actor_t *actor;
        (void) pool;

        for (int i=0; i < pool->actor_count; i++) {
                actor = &pool->actors[i];
                ret = sy_thread_create(__actor_proc, actor);
                if (unlikely(ret))
                        UNIMPLEMENTED(__DUMP__);
        }

        return 0;
}

static int __pool_stop_actors(b_pool_t *pool, int force)
{
        int count = 0;
        actor_t *actor;

        for (int i=0; i < pool->actor_count; i++) {
                actor = &pool->actors[i];
                // 此条件是必要的
                if (actor->_status == __TH_RUNNING__) {
                        if (force) {
                                actor->_status = __TH_STOP__;
                        } else {
                                actor->_stop_if_enoent = 1;
                        }
                }
        }

        // wait
        while (1) {
                if (pool->is_actors_stopped(pool))
                        break;

                usleep(1000 * 1000);
                count ++;
                DBUG("pool %s count %d\n", pool->name, count);
        }

        return 0;
}

static int __pool_is_actors_stopped(b_pool_t *pool)
{
        int count = 0;
        actor_t *actor;

        for (int i=0; i < pool->actor_count; i++) {
                actor = &pool->actors[i];

                if (actor->_status != __TH_STOPPED__) {
                        count++;
                }
        }

        return count == 0;
}

static inline b_vol_info_t *__pool_volume_find(b_pool_t *pool, const chkid_t *chkid)
{
        struct list_head *pos, *n;
        b_vol_info_t *vol;

        list_for_each_safe(pos, n, &pool->volume_list.list) {
                vol = list_entry(pos, b_vol_info_t, hook);
                if (chkid_cmp(chkid, &vol->chkid) == 0) {
                        return vol;
                }
        }

        return NULL;
}

static void __vol_set_status(b_pool_t *pool, b_vol_info_t *vol, b_vol_status_t status)
{
        if (status == __S_VOL_INIT) {
                pool->volume_count++;
                vol->status = status;
        } else {
                if (vol->status == __S_VOL_INIT) {
                        pool->volume_count--;
                }
                vol->status = status;
        }
}

static inline int __pool_volume_size(b_pool_t *pool)
{
        int size = 0;
        struct list_head *pos, *n;
        b_vol_info_t *vol;

        list_for_each_safe(pos, n, &pool->volume_list.list) {
                vol = list_entry(pos, b_vol_info_t, hook);
                // DBUG("pool %s vol "CHKID_FORMAT" status %d\n", pool->name, CHKID_ARG(&vol->chkid), vol->status);

                if (vol->status == __S_VOL_INIT)
                        size += 1;
        }

        return size;
}

static void __pool_scan__(void *_arg, void *_ent, void *_pool)
{
        int ret;
        b_vol_info_t *vol, *new_vol;
        const chkid_t *chkid = _ent;
        b_pool_t *pool = _arg;
        char *pool_name = _pool;

        (void) _pool;

        if (!is_volume(chkid)) {
                goto out;
        }

        if (strcmp(pool->name, pool_name) != 0) {
                goto out;
        }

        vol = __pool_volume_find(pool, chkid);
        if (vol != NULL) {
                if (vol->status == __S_VOL_REMCHG) {

                } else {
                        goto out;
                }
        }

        ret = __vol_init(&new_vol, vol, chkid, pool);
        if (unlikely(ret))
                goto out;

out:
        return;
}

static int __pool_scan(b_pool_t *pool)
{
        disk_maping->iterator("metadata", __pool_scan__, pool);

        int size = __pool_volume_size(pool);
        if (size <= 0) {
                DWARN("pool %s nothing to balance\n", pool->name);
                return EPERM;
        }

        DBUG("pool %s vol %d/%d/%d\n", pool->name, size, pool->volume_count, pool->volume_list.count);

        YASSERT(size == pool->volume_count);
        return 0;
}

/**
 * 检查扫描到的卷是否处于删除状态，
 * 如果处于删除状态应该取消该卷的任何操作
 */
static int __pool_check_deleted(b_pool_t *pool, count_list_t *volist)
{
        int ret, deleted = 0;
        struct list_head *pos, *n;
        b_vol_info_t *vol;

        list_for_each_safe(pos, n, &volist->list) {
                vol = (b_vol_info_t *)pos;

                ret = vol_is_deleting(&vol->chkid, &deleted);
                if (unlikely(ret)) {
                        continue;
                }

                if (deleted) {
                        DWARN("pool %s vol "CHKID_FORMAT" ret %d, remove this vol!\n",
                              pool->name, CHKID_ARG(&vol->chkid), ret);
                        // count_list_del(pos, volist);
                        // yfree((void**)&vol);
                        __vol_set_status(pool, vol, __S_VOL_DELETED);
                }
        }

        int size = __pool_volume_size(pool);
        if (size <= 0) {
                DWARN("pool %s nothing to balance\n", pool->name);
                return EPERM;
        }

        return 0;
}

typedef struct {
        b_pool_t *pool;
        b_vol_info_t *vol;
        int ret;
} arg_t;

static int __md_chunk_checkinfo(b_vol_info_t *vol, chkinfo_t *chkinfo)
{
        int ret;
        //now only check repnum, must check current site rack localize!!!
        if(vol->repnum != chkinfo->repnum) {
                DINFO(""CHKID_FORMAT" repnum not enough! unable to move!", CHKID_ARG(&chkinfo->id));
                ret = EAGAIN;
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

static int is_node_ok(nodeset_t *nodeset, b_node_info_t *node, b_faultset_level_t fs_level)
{
        int i, ok;

        ok = 1;
        for(i = 0; i < nodeset->node_count; i++) {
                //if current rack count enough, select skip former rack
                if (fs_level == __FAULT_SET_RACK__ && nodeset->bitmap[i]) {
                        if(0 == strcmp(nodeset->nodes[i]->rack, node->rack)) {
                                ok = 0;
                                break;
                        }
                }
        }

        return ok;
}

static inline int __nodeset_in(const nodeset_t *nodeset, const b_node_info_t *node)
{
        int found = 0;

        for (int i=0; i < nodeset->node_count; i++) {
                if (node->nid.id == nodeset->nodes[i]->nid.id) {
                        found = 1;
                        break;
                }
        }

        return found;
}

static int __replica_select_node(b_pool_t *pool, nodeset_t *nodeset,
                                 const b_node_info_t *src_node, b_node_info_t **dest_node)
{
        int i;
        b_node_info_t *node;

        *dest_node = NULL;

        for (i = 0; i < pool->node_list.node_count; i++) {
                node = &pool->node_list.nodes[i];

                if (__nodeset_in(nodeset, node))
                        continue;

                if (node->percent >= pool->average) {
                        DBUG("pool %s node %s:%f > average %f\n",
                             pool->name, node->node, node->percent, pool->average);
                        return -1;
                }

                if ((src_node->percent - node->percent) * 100 <= __module__->config.threshold) {
                        DBUG("pool %s src %s:%0.6f node %s:%0.6f < %0.6f\n",
                             pool->name,
                             src_node->node,
                             src_node->percent,
                             node->node,
                             node->percent,
                             __module__->config.threshold / 100.0);
                        return -1;
                }

                // check rack
                if (is_node_ok(nodeset, node, pool->fs_level)) {
                        DBUG("pool %s fs_level %d node %s/%s\n",
                             pool->name, pool->fs_level, node->rack, node->node);
                        *dest_node = node;
                        return 0;
                } else {
                        DBUG("pool %s fs_level %d node %s/%s\n",
                             pool->name, pool->fs_level, node->rack, node->node);
                }
        }

        DWARN("unable to find suitable node for replica!\n");
        return -1;
}

static void __check_raw_callback(void *_arg, void *_parent, void *_ent)
{
        int i, ret, replica_changed = 0;
        reploc_t *diskid;
        nodeset_t nodeset;
        chkinfo_t *chkinfo;
        b_node_list_t *nlist;
        b_node_info_t *src_node, *dest_node;
        arg_t *arg = _arg;

        chkid_t *parent = _parent;
        b_pool_t *pool = arg->pool;
        b_vol_info_t *vol = arg->vol;
        chkinfo = (chkinfo_t *)_ent;

        DBUG("pool %s vol "CHKID_FORMAT" chunk "CHKID_FORMAT"\n",
              pool->name, CHKID_ARG(&vol->chkid), CHKID_ARG(&chkinfo->id));

        YASSERT(chkid_cmp(parent, &vol->chkid) == 0);

        ANALYSIS_BEGIN(0);

#if 1
        ret = __md_chunk_checkinfo(vol, chkinfo);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }
#endif

        nlist = &pool->node_list;

        memset(&nodeset, 0, sizeof(nodeset_t));
        for(i = 0; i < chkinfo->repnum; i++) {
                diskid = &chkinfo->diskid[i];
                src_node = nlist->nid_map[diskid->id.id];
                if (NULL == src_node) {
                        DWARN(""CHKID_FORMAT" rep [%d], node not in nodelist, node offline?\n",
                              CHKID_ARG(&chkinfo->id), i);
                        ret = ENONET;
                        GOTO(err_ret, ret);
                }

                YASSERT(0 == nid_cmp(&diskid->id, &src_node->nid));

                nodeset.nodes[i] = src_node;
                nodeset.bitmap[i] = 1;
                nodeset.node_count++;
        }

        //排序 优先移动磁盘占用率高的节点上的副本
        qsort(nodeset.nodes, nodeset.node_count, sizeof(b_node_info_t *), percent_cmp);

        for (i = 0; i < chkinfo->repnum; i++) {
                src_node = nodeset.nodes[i];

#if ENABLE_BLC_LOCAL_REP
                // skip local node
                if (vol->localize && net_islocal(&src_node->nid)) {
                        DBUG("src %s/%s\n", src_node->rack, src_node->node);
                        continue;
                }
#endif

                DBUG(""CHKID_FORMAT" replica %d src_node %s percent %0.6f node %d\n",
                     CHKID_ARG(&chkinfo->id),
                     i,
                     src_node->node,
                     src_node->percent,
                     pool->node_list.node_count);

                nodeset.bitmap[i] = 0;

                ret = __replica_select_node(pool, &nodeset, src_node, &dest_node);
                if (unlikely(ret)) {
                        nodeset.bitmap[i] = 1;
                } else {
                        YASSERT(dest_node != NULL);
                        replica_changed++;

                        nodeset.bitmap[nodeset.node_count] = 1;
                        nodeset.nodes[nodeset.node_count] = dest_node;
                        nodeset.node_count++;

                        DBUG("replica "CHKID_FORMAT" rep idx:[%d], from %s -> %s\n",
                             CHKID_ARG(&chkinfo->id), i, src_node->node, network_rname(&dest_node->nid));
                }
        }

        ANALYSIS_END(0, 1000*100, NULL);

#if DRY_RUN
        replica_changed = 1;
#endif

        if (replica_changed) {
                b_job_t *job;

                ret = __job_create(&job, parent, chkinfo, &nodeset);
                if (unlikely(ret)) {
                        GOTO(err_ret, ret);
                }

                ret = pool->push(pool, job);
                if (unlikely(ret)) {
                        yfree((void **)&job);
                        GOTO(err_ret, ret);
                }
        } else {
                ret = pool->inc_count(pool, __T_COUNT_CHECK__, 1);
                if (unlikely(ret))
                        UNIMPLEMENTED(__DUMP__);
        }

        return;
err_ret:
        ((arg_t *)_arg)->ret = ret;
        return;
}

static int __vol_pop(b_vol_info_t *vol, b_pool_t *pool)
{
        int ret;
        uint32_t idx;
        uint64_t _idx;
        chkid_t rawid;
        arg_t arg;

        ret = chkid_generator(&vol->gen, &idx);
        if (unlikely(ret)) {
                // ENOENT, vol iterator done
                GOTO(err_ret, ret);
        }

        _idx = idx;
        fid2cid(&rawid, &vol->chkid, _idx);

        DBUG("pool %s vol "CHKID_FORMAT" size %ju idx %ju\n",
             pool->name, CHKID_ARG(&rawid), vol->size, _idx);

        arg.pool = pool;
        arg.vol = vol;
        arg.ret = 0;
        ret = volume_ctl_chunk_iterator2_with_cursor(&vol->chkid, vol->size, 1, 0, &_idx,
                                                     __check_raw_callback, &arg);
        if (unlikely(ret)) {
                // ESTALE
                DWARN(""CHKID_FORMAT" ret %d\n", CHKID_ARG(&vol->chkid), ret);
                if (ret == EREMCHG) {
                        GOTO(err_ret, ret);
                } else {
                        GOTO(err_ret, ret);
                }
        }

        if (unlikely(arg.ret)) {
                DWARN(""CHKID_FORMAT" ret %d\n", CHKID_ARG(&vol->chkid), arg.ret);
                ret = arg.ret;
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

static int __pool_is_saturated(b_pool_t *pool)
{
        return pool->total - pool->success > pool->actor_count * 1000;
}

static int __pool_step(b_pool_t *pool, int step)
{
        int ret, _count = 0, empty = 0;
        struct list_head *pos, *n;
        b_vol_info_t *vol;

        while (_count < step) {
                empty = 1;
                list_for_each_safe(pos, n, &pool->volume_list.list) {
                        vol = (b_vol_info_t *)pos;
                        if (vol->status != __S_VOL_INIT)
                                continue;

                        empty = 0;

                        ret = __vol_pop(vol, pool);
                        if (unlikely(ret)) {
                                DINFO("pool %s vol "CHKID_FORMAT" ret %d\n",
                                      pool->name, CHKID_ARG(&vol->chkid), ret);
                                if (ret == ENOENT || ret == EREMCHG) {
                                        // count_list_del_init(pos, &pool->volume_list);
                                        // yfree((void **)&pos);
                                        if (ret == ENOENT) {
                                                __vol_set_status(pool, vol, __S_VOL_DONE);
                                        } else if (ret == EREMCHG || ret == ESTALE) {
                                                DWARN("pool %s vol "CHKID_FORMAT" ret %d\n",
                                                      pool->name, CHKID_ARG(&vol->chkid), ret);
                                                __vol_set_status(pool, vol, __S_VOL_REMCHG);
                                        }
                                        continue;
                                } else if (ret == EAGAIN) {
                                        continue;
                                } else {
                                        GOTO(err_ret, ret);
                                }
                        }

                        _count++;
                }

                if (unlikely(empty)) {
                        break;
                }
        }

        return (_count == step) ? 0 : ENOENT;
err_ret:
        return ret;
}

static int __pool_push(b_pool_t *pool, b_job_t *job)
{
        int ret;
        // int hash = job->parent.id % pool->actor_count;
        int hash = (pool->hash++) % pool->actor_count;

        DBUG("pool %s total %ju success %ju hash %d vol "CHKID_FORMAT" chunk "CHKID_FORMAT"\n",
             pool->name, pool->total, pool->success, hash, CHKID_ARG(&job->parent), CHKID_ARG(&job->chkinfo->id));

        actor_t *actor = &pool->actors[hash];
        ret = actor->push(actor, job);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = pool->inc_count(pool, __T_COUNT_PUSH__, 1);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        return 0;
err_ret:
        return ret;
}

static int __pool_inc_count(b_pool_t *pool, int t, int n)
{
        int ret;

        ret = sy_rwlock_wrlock(&pool->count_lock);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        if (t == __T_COUNT_CHECK__) {
                pool->check += n;
        } else if (t == __T_COUNT_PUSH__) {
                pool->total += n;
        } else if (t == __T_COUNT_SUCCESS__) {
                pool->success += n;
        } else if (t == __T_COUNT_FAIL__) {
                pool->fail += n;
        }

        sy_rwlock_unlock(&pool->count_lock);

        return 0;
}

static int __pool_done(b_pool_t *pool)
{
        int ret, done = 0;

        ret = sy_rwlock_rdlock(&pool->count_lock);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        done = (pool->total == pool->success + pool->fail);

        sy_rwlock_unlock(&pool->count_lock);

        return done;
}

static int __pool_wakeup(b_pool_t *pool)
{
        (void) pool;
        return 0;
}

static const char *__status2str__(b_status_t status)
{
#if 0
        static char *_map[__S_UNKNOWN__] = {
                "stopped",
        };
#endif

        if (status == __S_SCAN__)
                return "scan";
        else if (status == __S_RUNNING__)
                return "running";
        else if (status == __S_WAITING__)
                return "waiting";
        else if (status == __S_WAITING_RECOVERY__)
                return "waiting_recovery";
        else if (status == __S_STOPPED__)
                return "stopped";
        else if (status == __S_STOP__)
                return "stop";
        else if (status == __S_FAILED__)
                return "failed";
        else if (status == __S_DONE__)
                return "done";
        else
                return "unknown";
}

static int __pool_set_status(b_pool_t *pool, b_status_t new)
{
        int ret;
        char key[MAX_NAME_LEN];
        char str[MAX_BUF_LEN];

        DBUG("pool %s status %s -> %s volume %d node %d check %ju total %ju success %ju fail %ju speed %ju\n",
             pool->name,
             __status2str__(pool->status),
             __status2str__(new),
             pool->volume_count,
             pool->node_list.node_count,
             pool->check,
             pool->total,
             pool->success,
             pool->fail,
             pool->speed);

        if (new != __S_UNKNOWN__) {
                pool->status = new;
        }

        b_config_t *config = &__module__->config;
        snprintf(str, MAX_PATH_LEN,
                 "status:%s\n"
                         "interval:%d\n"
                         "threshold:%d\n"
                         "thread:%d\n"
                         "step:%d\n"
                         "fill_rate:%d\n"
                         "volume:%d\n"
                         "node:%d\n"
                         "check:%ju\n"
                         "total:%ju\n"
                         "success:%ju\n"
                         "fail:%ju\n"
                         "speed:%ju\n",
                 __status2str__(pool->status),
                 config->interval,
                 config->threshold,
                 config->actor_num,
                 config->step,
                 config->opt_fill_rate.value,
                 pool->volume_count,
                 pool->node_list.node_count,
                 pool->check,
                 pool->total,
                 pool->success,
                 pool->fail,
                 pool->speed);

        sprintf(key, "balance/%s/info", pool->name);

        ret = sy_rwlock_wrlock(&pool->dump_lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        __config_set(key, str);

        sy_rwlock_unlock(&pool->dump_lock);

        return 0;
err_ret:
        return ret;
}

static int __pool_create(b_pool_t **pool, const char *pool_name)
{
        int ret;
        (void)pool_name;
        b_pool_t *_pool;
        actor_t *actor;

        *pool = NULL;

        ret = ymalloc((void **)&_pool, sizeof(b_pool_t));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        memset(_pool, 0x0, sizeof(b_pool_t));

        strcpy(_pool->name, pool_name);

        b_node_list_t *nlist = &_pool->node_list;
        nlist->_pool = _pool;

        nlist->get = __node_list_get;
        nlist->reset = __node_list_reset;
        nlist->dump = __node_list_dump;

        count_list_init(&_pool->volume_list);
        _pool->volume_count = 0;

        YASSERT(__module__->config.actor_num <= ACTOR_MAX);

        _pool->actor_count = __module__->config.actor_num;
        for (int i=0; i < _pool->actor_count; i++) {
                actor = &_pool->actors[i];
                actor->_inited = 0;
                actor->_idx = i;
                actor->_status = __TH_STOPPED__;
                actor->_stop_if_enoent = 0;
                actor->_pool = _pool;

                actor->init = __actor_init;
                actor->push = __actor_push;
                actor->pop = __actor_pop;

                actor->init(actor);
        }

        _pool->status = __S_STOPPED__;
        _pool->fs_level = __FAULT_SET_NODE__;
        _pool->hash = 0;

        _pool->total = 0;
        _pool->success = 0;
        _pool->fail = 0;

        timerange1_init(&_pool->trange, "balance", 1000 * 1000);

        ret = sem_init(&_pool->sem, 0, 0);
        if (ret < 0)
                UNIMPLEMENTED(__DUMP__);

        ret = sy_rwlock_init(&_pool->count_lock, "total");
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        ret = sy_rwlock_init(&_pool->dump_lock, "dump");
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        int fill_rate = __module__->config.opt_fill_rate.value;
        token_bucket_init(&_pool->bucket, "balance.pool", fill_rate, fill_rate, fill_rate, 0, 0);

        // ops
        _pool->destroy = __pool_destroy;

        _pool->start_main_thread = __pool_start_main_pool;

        _pool->start_actors = __pool_start_actors;
        _pool->stop_actors = __pool_stop_actors;
        _pool->is_actors_stopped = __pool_is_actors_stopped;

        _pool->scan = __pool_scan;

        _pool->is_saturated = __pool_is_saturated;
        _pool->step = __pool_step;
        _pool->push = __pool_push;

        _pool->inc_count = __pool_inc_count;
        _pool->done = __pool_done;

        _pool->set_status = __pool_set_status;
        _pool->wakeup = __pool_wakeup;

        *pool = _pool;

        DDEV("pool %s %p\n", _pool->name, _pool);
        return 0;
err_ret:
        return ret;
}

// @class module

static int module_sleep()
{
        int ret;

        DINFO("sleep\n");

        __module__->status = __S_STOPPED__;

        ret = _sem_timedwait1(&__module__->sem, __module__->config.interval);
        if (unlikely(ret)) {
                if (ret == ETIMEDOUT) {
                        DINFO("interval %d\n", __module__->config.interval);
                } else
                        UNIMPLEMENTED(__DUMP__);
        }

        if (recovery_is_running()) {
                DWARN("recovery is running\n");
                ret = EAGAIN;
                GOTO(err_ret, ret);
        }

        DINFO("run\n");
        __module__->status = __S_RUNNING__;
        return 0;
err_ret:
        return ret;
}

typedef struct {
        int running_pool;

        struct list_head *balance_pool_list;

        struct list_head *deleted_pool_list;
        struct list_head *system_pool_list;
} __pool_inspect_arg_t;

static inline void __pool_inspect_init(__pool_inspect_arg_t *inspect) {
        inspect->running_pool = 0;
        inspect->balance_pool_list = NULL;
        inspect->deleted_pool_list = NULL;
        inspect->system_pool_list = NULL;
}

static int __pool_inspect(void *arg1, void *arg2)
{
        int found;
        __pool_inspect_arg_t *inspect = arg1;
        b_pool_t *pool = arg2;

        DBUG("pool %s status %s\n", pool->name, __status2str__(pool->status));

        if (pool->status != __S_DONE__) {
                inspect->running_pool += 1;
        }

        if (inspect->balance_pool_list) {
                YASSERT(inspect->deleted_pool_list == NULL);
                list_add_tail(&pool->hook, inspect->balance_pool_list);
        }

        // TODO 通过一个hook不能加入两个list中

        if (inspect->deleted_pool_list) {
                YASSERT(inspect->system_pool_list != NULL);
                YASSERT(inspect->balance_pool_list == NULL);

                found = system_pool_list_exists(inspect->system_pool_list, pool->name);
                if (!found) {
                        list_add_tail(&pool->hook, inspect->deleted_pool_list);
                }
        }

        return 0;
}

static void __module_wait()
{
        int ret, count = 0;
        __pool_inspect_arg_t inspect;
        struct list_head deleted_pools;
        struct list_head system_pools;
        struct list_head *pos, *n;
        b_pool_t *pool;

        INIT_LIST_HEAD(&deleted_pools);
        INIT_LIST_HEAD(&system_pools);

        while(1) {
                if (count % 10 == 0) {
                        list_free(&system_pools, yfree);

                        YASSERT(list_empty_careful(&system_pools));

                        ret = system_pool_list(&system_pools);
                        if (unlikely(ret)) {
                                DWARN("ret %d\n", ret);
                                continue;
                        }
                }

                YASSERT(list_empty_careful(&deleted_pools));

                __pool_inspect_init(&inspect);
                inspect.deleted_pool_list = &deleted_pools;
                inspect.system_pool_list = &system_pools;

                // TODO check deleted pools, and gc
                hash_iterate_table_entries(__module__->pools, __pool_inspect, &inspect);

                DBUG("system pool %d pool %d/%d/%d count %d\n",
                     list_size(&system_pools),
                     __module__->pools->num_of_entries,
                     inspect.running_pool,
                     list_size(&deleted_pools),
                     count);

                list_for_each_safe(pos, n, &deleted_pools) {
                        pool = list_entry(pos, b_pool_t, hook);
                        list_del_init(pos);

                        DWARN("pool %s deleted\n", pool->name);
                        pool->status = __S_STOP__;
                }

                if (inspect.running_pool == 0) {
                        list_free(&system_pools, yfree);
                        break;
                }

                usleep(1000 * 1000);
                count += 1;
        }
}

static void __module_reset_pools()
{
        int ret;
        __pool_inspect_arg_t inspect;
        struct list_head balance_pools;
        struct list_head *pos, *n;
        b_pool_t *pool, *pool2;

        INIT_LIST_HEAD(&balance_pools);

        __pool_inspect_init(&inspect);
        inspect.balance_pool_list = &balance_pools;

        hash_iterate_table_entries(__module__->pools, __pool_inspect, &inspect);

        list_for_each_safe(pos, n, &balance_pools) {
                pool = list_entry(pos, b_pool_t, hook);
                list_del_init(pos);

                ret = hash_table_remove(__module__->pools, pool->name, (void **)&pool2);
                if (unlikely(ret))
                        UNIMPLEMENTED(__DUMP__);

                YASSERT(pool2 == pool);
                YASSERT(pool->status == __S_DONE__);

                DBUG("pool %s %p\n", pool->name, pool);

                pool->destroy(pool);
                yfree((void **)&pool);
        }

        // hash_destroy_table(__module__->pools, NULL);
}

static void *__module_main_thread_proc(void *arg)
{
        int ret;
        struct list_head pool_list;
        struct list_head *pos, *n;
        pool_name_t *pool_name;
        b_pool_t *pool;
        (void) arg;
        struct timeval t1, t2;
        int64_t used;

        while (1) {
                ret = module_sleep();
                if (unlikely(ret)) {
                        usleep(1000 * 1000);
                        continue;
                }

                _gettimeofday(&t1, NULL);

                __module__->load_config(__module__, 1);

                YASSERT(__module__->status == __S_RUNNING__);
                YASSERT(__module__->pools->num_of_entries == 0);

                ret = system_pool_list(&pool_list);
                if (unlikely(ret)) {
                        DWARN("ret %d\n", ret);
                        continue;
                }

                list_for_each_safe(pos, n, &pool_list) {
                        pool_name = list_entry(pos, pool_name_t, hook);

                        ret = __pool_create(&pool, pool_name->pool);
                        if (unlikely(ret))
                                UNIMPLEMENTED(__DUMP__);

                        pool->start_main_thread(pool);

                        ret = sem_wait(&pool->sem);
                        if (unlikely(ret))
                                UNIMPLEMENTED(__DUMP__);

                        ret = hash_table_insert(__module__->pools, pool, pool->name, 0);
                        if (unlikely(ret))
                                UNIMPLEMENTED(__DUMP__);
                }

                list_free(&pool_list, yfree);

                __module_wait();
                __module_reset_pools();

                _gettimeofday(&t2, NULL);
                used = _time_used(&t1, &t2);

                DINFO("used %ju\n", used / USECONDS_PER_SEC);
        }

        YASSERT(1);
        return NULL;
}

static int __balance_runlevel(void *context, uint32_t mask)
{
        nodectl_option_t *opt = context;
        nodectl_file_t *pnf = &opt->nf;

        int runlevel = opt->value;

        DINFO("runlevel %d status %d mask %u\n", runlevel, __module__->status, mask);

        if (runlevel == __RL_OFF__) {
                return 0;
        } else if (runlevel == __RL_START__) {
                if (__module__->status == __S_STOPPED__) {
                        sem_post(&__module__->sem);
                } else {
                        DWARN("balancer is running\n");
                }
        } else if (runlevel == __RL_STOP__) {
                if (__module__->status == __S_RUNNING__) {
                        __module__->stop();
                }
        }

        pnf->set2(pnf, "0");

        return 0;
}

static uint32_t __pool_key(const void *i)
{
        return hash_str((char *)i);
}

static int __pool_cmp(const void *v1, const void *v2)
{
        const b_pool_t *pool = v1;
        const char *name = v2;

        return strcmp(pool->name, name);
}

static int __module_stop()
{
        __pool_inspect_arg_t inspect;
        struct list_head balance_pools;
        struct list_head *pos, *n;
        b_pool_t *pool;

        DINFO("stop\n");

        INIT_LIST_HEAD(&balance_pools);

        __pool_inspect_init(&inspect);
        inspect.balance_pool_list = &balance_pools;

        hash_iterate_table_entries(__module__->pools, __pool_inspect, &inspect);

        list_for_each_safe(pos, n, &balance_pools) {
                pool = list_entry(pos, b_pool_t, hook);
                list_del_init(pos);

                DINFO("stop pool %s status %s\n", pool->name, __status2str__(pool->status));
                if (pool->status == __S_RUNNING__ || pool->status == __S_WAITING_RECOVERY__) {
                        pool->status = __S_STOP__;
                }
        }

        return 0;
}

static int __module_load_config(b_module_t *module, int log)
{
        nodectl_file_t nf, *pnf = &nf;

        opt_data_init(pnf, "balance/interval");
        module->config.interval = pnf->get_int(pnf, 3600 * 24 * 365);

        opt_data_init(pnf, "balance/threshold");
        module->config.threshold = pnf->get_int(pnf, 3);

        opt_data_init(pnf, "balance/thread");
        module->config.actor_num = pnf->get_int(pnf, 10);

        opt_data_init(pnf, "balance/step");
        module->config.step = pnf->get_int(pnf, 1000);

        itorange(&module->config.interval, 600, 3600 * 24 * 365);
        itorange(&module->config.threshold, 1, 99);
        itorange(&module->config.actor_num, 1, 10);
        itorange(&module->config.step, 1, INT_MAX);
        itorange(&module->config.opt_fill_rate.value, 0, INT_MAX);

        if (log) {
                DINFO("interval %d threshold %d actor %d step %d fill_rate %d\n",
                      module->config.interval,
                      module->config.threshold,
                      module->config.actor_num,
                      module->config.step,
                      module->config.opt_fill_rate.value);
        }

        return 0;
}

static int __module_create(b_module_t **module)
{
        int ret;
        b_module_t *_module;
        nodectl_option_t *opt;

        *module = NULL;

        ret = ymalloc((void **)&_module, sizeof(b_module_t));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        memset(_module, 0x0, sizeof(b_module_t));

        ret = sem_init(&_module->sem, 0, 0);
        if (ret < 0) {
                ret = errno;
                UNIMPLEMENTED(__DUMP__);
        }

        opt = &_module->opt_runlevel;
        opt_nodectl_init2(opt);
        opt->start(opt, "balance/runlevel", "0", __balance_runlevel);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        _module->pools = hash_create_table(__pool_cmp, __pool_key, "balance.pools");
        if (_module->pools == NULL) {
                ret = ENOMEM;
                DERROR("ret (%d) %s\n", ret, strerror(ret));
                GOTO(err_ret, ret);
        }

        _module->status = __S_STOPPED__;

        opt = &_module->config.opt_fill_rate;

        opt_data_init2(opt);
        opt->start(opt, "balance/fill_rate", "100", NULL);

        // ops
        _module->stop = __module_stop;
        _module->load_config = __module_load_config;

        *module = _module;
        return 0;
err_ret:
        return ret;
}

int balance_pool_init()
{
        int ret;

        ret = __module_create(&__module__);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        __module__->load_config(__module__, 1);

        ret = sy_thread_create2(__module_main_thread_proc, NULL, "balance_pool");
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}
